package com.ttx.transform.validate.common.validate;

import cn.hutool.core.lang.Assert;
import cn.hutool.crypto.SecureUtil;
import com.alibaba.fastjson.JSON;
import com.ttx.transform.validate.common.mapper.CommonMapper;
import com.ttx.transform.validate.common.model.dataobject.ShardingKeyDO;
import com.ttx.transform.validate.common.util.JsonUtil;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisCursorItemReader;
import org.slf4j.Logger;
import org.springframework.batch.item.ExecutionContext;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 *
 * 数据库校验， 源库  和 目的库的比较， 如果目的库不存在源库的数据， 会插入源库数据
 *
 * 如果目的库数据和源库数据不一样， 就会使用源库数据更新目的数据库
 *
 *
 * 数据量过大，可以考虑随机校验
 * @author TimFruit
 * @date 20-4-22 上午11:27
 */
public class CommonDBValidator implements IValidator{
    protected Logger log;

    //校验器区分标签
    protected String tag;

    protected Long startId;

    protected Long endId;

    protected Long timeSleep;

    //步长
    protected Long stepLen;

    //最大检验次数
    private Integer maxValidateCount;


    //源库
    protected CommonMapper sourceMapper;
    protected Class<? extends CommonMapper> sourceMapperClass;
    protected String listEntitiesByRangeQueryId;
    protected SqlSessionFactory sourceSqlSessionFactory;//单库
    //源库标签
    protected  String sourceTag;


    //目的库
    protected CommonMapper destMapper;
    protected  String destTag;





    //通知停止校验
    protected volatile boolean shoutdownSignal;



    public CommonDBValidator(ValidateConfig config) {
        checkValidateDto(config);

        log=config.getLog();
        tag= config.getTag();
        startId=config.getStartId();
        endId=config.getEndId();
        stepLen=config.getStepLen();
        maxValidateCount=config.getMaxValidateCount();
        timeSleep=config.getTimeSleep();

        sourceTag=config.getSourceTag();
        sourceMapper =config.getSourceMapper();
        sourceSqlSessionFactory =config.getSourceSqlSessionFactory();
        sourceMapperClass =config.getSourceMapperClass();
        listEntitiesByRangeQueryId= sourceMapperClass.getName()+".listEntitiesByRange";

        destMapper =config.getDestMapper();
        destTag=config.getDestTag();


    }


    protected void checkValidateDto(ValidateConfig config){
        Assert.notEmpty(config.getTag(), "[tag]不能为空");
        Assert.notNull(config.getLog(), "[log]不能为空");
        Assert.notNull(config.getSourceMapper(), "[SourceMapper]不能为空");
        Assert.notNull(config.getSourceMapperClass(), "[SourceMapperClass]不能为空");
        Assert.notNull(config.getSourceSqlSessionFactory(), "[SourceSqlSessionFactory]不能为空");
        Assert.notNull(config.getSourceTag(), "[SourceTag]不能为空");


        Assert.notNull(config.getDestMapper(), "[DestMapper]不能为空");
        Assert.notNull(config.getDestTag(), "[DestTag]不能为空");

        Assert.notNull(config.getStepLen(), "[StepLen]不能为空");
        Assert.notNull(config.getMaxValidateCount(), "[MaxValidateCount]不能为空");
//        Assert.notNull(config.getStartId(), "[StartId]不能为空");
//        Assert.notNull(config.getEndId(), "[EndId]不能为空");
        Assert.notNull(config.getTimeSleep(), "[timeSleep]不能为空");
    }




    public void validate(){
        log.info("==========================");
        log.info("根据 source: {} 校验 dest: {}", sourceTag, destTag);
        log.info("==========================");
        Long innerStartId= sourceMapper.selectMinIdInRange(startId,endId);
        Long innerEndId= sourceMapper.selectMaxIdInRange(startId,endId);
        innerStartId=(innerStartId==null)?0:innerStartId;
        innerEndId=(innerEndId==null)?0:innerEndId;

        log.info("tag: {}", tag);
        log.info("startId: {}", startId);
        log.info("endId: {}", endId);
        log.info("innerStartId: {}", innerStartId);
        log.info("innerEndId: {}", innerEndId);
        log.info("stepLen: {}", stepLen);
        log.info("maxValidateCount: {}", maxValidateCount);
        log.info("timeSleep: {}", timeSleep);
        log.info("==========================");


        MyBatisCursorItemReader cursorReader=null;

        int validateCount=0;
        try {
            while(!shoutdownSignal && true){
                validateCount++;

                Long sourceCount= sourceMapper.countEntities(innerStartId,innerEndId);
                Long destCount=destMapper.countEntities(innerStartId,innerEndId);

                log.info("tag:{}, innerStartId: {}, innerEndId:{}  source:{} 数据量为 {}",
                        tag,innerStartId, innerEndId, sourceTag, sourceCount);
                log.info("tag:{}, innerStartId: {}, innerEndId:{}  dest:{} 数据量为 {}",
                        tag,innerStartId, innerEndId, destTag, destCount);

                if(sourceCount<destCount){
                    log.warn("source:{} 数据量 小于 dest: {}", sourceTag, destTag);
                }

                if(sourceCount.longValue()==0){
                    log.info("检验完{}数据", sourceTag);
                    sleep();
                    continue;
                }else {
                    log.info("对比...");


                    Object tempSingleDO=null;


                    Long endId;
                    for(long id=(innerStartId-1); !shoutdownSignal && id<=innerEndId;id+=stepLen){
                        endId=id+stepLen;
                        log.info("tag:{}, 检验({},{}] ...", tag, id, endId);
                        cursorReader=listEntityCursorReader(id, endId);

                        while(!shoutdownSignal && (tempSingleDO=cursorReader.read())!=null){
                            validateOne(tempSingleDO);
                        }

                        cursorReader.close();

                    }



                }

                log.info("第{}次校验完成, tag:{}, source: {}, dest:{}, startId:{}, endId:{}, innerStartId:{}, innerEndId:{}...",
                        validateCount, tag, sourceTag, destTag, startId, endId, innerStartId, innerEndId);
                if(validateCount>=maxValidateCount){
                    log.info("达到最大检验次数{}, 退出 tag:{}, source: {}, dest:{}, startId:{}, endId:{}, innerStartId:{}, innerEndId:{}",
                            maxValidateCount, tag, sourceTag, destTag, startId, endId, innerStartId, innerEndId);
                    break;
                }
                sleep();

            }
        }catch (Exception e){
            log.error("校验"+tag+"("+sourceTag+"-> "+destTag+")数据出错",e);
        }finally {
            if(cursorReader!=null){
                cursorReader.close();
            }
        }




    }


    //不含beginId
    private MyBatisCursorItemReader listEntityCursorReader(Long beginId, Long endId){
        MyBatisCursorItemReader reader=new MyBatisCursorItemReader();
        reader.setSqlSessionFactory(sourceSqlSessionFactory);
        reader.setQueryId(listEntitiesByRangeQueryId);
        Map<String,Object> params=new HashMap<>();
        params.put("beginId", beginId);
        params.put("endId", endId);

        reader.setParameterValues(params);

        try {
            reader.afterPropertiesSet();
        } catch (Exception e) {
            log.error("",e);
        }

        reader.open(new ExecutionContext());
        return reader;
    }


    protected void validateOne(Object tempSourceDO){
        if(tempSourceDO==null){
            return;
        }

        ShardingKeyDO sourceDO=(ShardingKeyDO)tempSourceDO;

        Object tempDestDO= destMapper.selectEntityByIdAndUserId(sourceDO.getDoId(), sourceDO.getUserId());



        if(tempDestDO==null){
            log.info("tag:{}, dest {} id:{} 对应的数据不存在，将插入source {}数据",
                    tag, destTag, sourceDO.getDoId(), sourceTag);
            destMapper.insertWidthId(sourceDO);
            return;
        }


        ShardingKeyDO destDO=(ShardingKeyDO)tempDestDO;



        int result=sourceDO.getUpdateTime().compareTo(destDO.getUpdateTime());
        // 注意，使用更新时间 比较重要， 如果原数据表没有，可以在单库，和分库，分别添加临时更新时间表来实现
        // 因为由"双写,单库为主"切换到"双写，分库为主" 会相互影响，难以区分， 使用更新时间，可以区分
        if(result>0){//单库数据比较新

            log.info("tag:{}, id:{}  source {} 对应的数据更新时间大于 {} 数据时间",
                    tag, sourceDO.getDoId(), sourceTag, destTag);
            log.info("更新dest {}数据", destTag);
            destMapper.updateByIdAndUserId(sourceDO);


        }else {

            String sourceJson=JsonUtil.format(sourceDO);
            String destJson=JsonUtil.format(destDO);
            String sourceMd5=SecureUtil.md5(sourceJson);
            String destMd5=SecureUtil.md5(destJson);

            if(!sourceMd5.equals(destMd5)){//额外检查，防止意外
                log.warn("tag:{}, id:{} 对应的数据不一样", tag, sourceDO.getDoId());
                log.warn("source {}  tag {}: {}",  sourceTag, tag, sourceJson);
                log.warn("dest {} tag {}: {}", destTag,  tag, destJson);
            }

        }





    }




    public void shutdown(){
        this.shoutdownSignal=true;
    }

    private void sleep() throws InterruptedException {
        log.info("休眠等待{}秒",timeSleep);
        TimeUnit.SECONDS.sleep(timeSleep);
    }
}
